#include "kvServer.h"

#include <rpcprovider.h>

#include "mprpcconfig.h"
#include "util.h"
#include "Logger_v2.h"

void KvServer::DprintfKVDB() {
    if (!Debug) {
        return;
    }
    std::lock_guard<std::mutex> lg(m_mtx);
    DEFER{
        // for (const auto& item : m_kvDB) {
        //     DPrintf("[DBInfo ----]Key : %s, Value : %s", &item.first, &item.second);
        // }
        // m_skipList.display_list();
    };
}

void KvServer::ExecuteAppendOpOnKVDB(Op op) {
    // if op.IfDuplicate {   //get请求是可重复执行的，因此可以不用判断重复
    //	return
    // }
    // m_mtx.lock();

    m_kvDB->put(op.Key, op.Value);
    std::lock_guard<std::mutex> lg(m_lastRequestIdMtx);
    m_lastRequestId[op.ClientId] = op.RequestId;

    // m_mtx.unlock();
    // DprintfKVDB();
}

void KvServer::ExecuteGetOpOnKVDB(Op op, std::string* value, bool* exist) {
    // m_mtx.lock();
    *value = "";
    *exist = false;
    // if (m_skipList.search_element(op.Key, *value)) {
    //     *exist = true;
    // }
    if(m_kvDB->get(op.Key, *value)) {
        *exist = true;
    }

    std::lock_guard<std::mutex> lg(m_lastRequestIdMtx);
    m_lastRequestId[op.ClientId] = op.RequestId;
    // m_mtx.unlock();

    // if (*exist) {
    //     LOG<<"[KVServerExeGET----]ClientId :"<<op.ClientId << " ,RequestID :" << op.RequestId << " ,Key : " << op.Key << " ,value :" << *value;
    // }
    // else {
    //     LOG<<"[KVServerExeGET----]ClientId :"<<op.ClientId << " ,RequestID :" << op.RequestId << " ,Key : " << op.Key << " ,But No KEY!!!!";
    // }
    // DprintfKVDB();
}

void KvServer::ExecutePutOpOnKVDB(Op op) {
    // m_mtx.lock();

    // m_skipList.insert_set_element(op.Key, op.Value);
    m_kvDB->put(op.Key, op.Value);

    std::lock_guard<std::mutex> lg(m_lastRequestIdMtx);
    m_lastRequestId[op.ClientId] = op.RequestId;
    // m_mtx.unlock();

    // DprintfKVDB();
}

// 处理来自clerk的Get RPC
void KvServer::Get(const raftKVRpcProctoc::GetArgs* args, raftKVRpcProctoc::GetReply* reply) {
    Op op;
    op.Operation = "Get";
    op.Key = args->key();
    op.Value = "";
    op.ClientId = args->clientid();
    op.RequestId = args->requestid();

    int newLogIndex = -1;
    int _ = -1;
    bool isLeader = false;
    // 测试发送新日志耗时（微秒）
    auto start = std::chrono::high_resolution_clock::now();
    // newLogIndex：raft预计的logIndex，虽然是预计，但是正确情况下是准确的，op的具体内容对raft来说 是隔离的
    m_raftNode->Start(op, &newLogIndex, &_, &isLeader);  
    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
    LOG << "KvServer::Get向raft节点发送新日志耗时：newLogIndex : " << newLogIndex << " , ClientId : " << op.ClientId << " , RequestId : " << op.RequestId << " , Duration : " << duration << "us";

    if (!isLeader) {
        reply->set_err(ErrWrongLeader);
        return;
    }


    waitApplyChMtx.lock();
    if (waitApplyCh.find(newLogIndex) == waitApplyCh.end()) {
        waitApplyCh.insert(std::make_pair(newLogIndex, new LockQueue<Op>()));
    }
    auto chForRaftIndex = waitApplyCh[newLogIndex];
    waitApplyChMtx.unlock();


    // 测试耗时
    start = std::chrono::high_resolution_clock::now();
    // timeout
    Op raftCommitOp;
    if (!chForRaftIndex->timeOutPop(CONSENSUS_TIMEOUT, &raftCommitOp)) {
        // raft没有提交该command（op），可能是raft超时了
        int _ = -1;
        bool isLeader = false;
        m_raftNode->GetState(&_, &isLeader);

        if (ifRequestDuplicate(op.ClientId, op.RequestId) && isLeader) {
            //如果超时，代表raft集群不保证已经commitIndex该日志，但是如果是已经提交过的get请求，是可以再执行的。
            // 不会违反线性一致性
            std::string value;
            bool exist = false;
            ExecuteGetOpOnKVDB(op, &value, &exist);
            if (exist) {
                reply->set_err(OK);
                reply->set_value(value);
            }
            else {
                reply->set_err(ErrNoKey);
                reply->set_value("");
            }
        }
        else {
            reply->set_err(ErrWrongLeader);  //返回这个，其实就是让clerk换一个节点重试
        }
    }
    else {
        // raft提交了该command（op），可以执行
        // todo 这里还要再次检验的原因：感觉不用检验，因为leader只要正确的提交了，那么这些肯定是符合的
        if (raftCommitOp.ClientId == op.ClientId && raftCommitOp.RequestId == op.RequestId) {
            std::string value;
            bool exist = false;
            ExecuteGetOpOnKVDB(op, &value, &exist);
            if (exist) {
                reply->set_err(OK);
                reply->set_value(value);
            }
            else {
                reply->set_err(ErrNoKey);
                reply->set_value("");
            }
        }
        else {
            reply->set_err(ErrWrongLeader);
            //            DPrintf("[GET ] 不满足：raftCommitOp.ClientId{%v} == op.ClientId{%v} && raftCommitOp.RequestId{%v}
            //            == op.RequestId{%v}", raftCommitOp.ClientId, op.ClientId, raftCommitOp.RequestId, op.RequestId)
        }
    }
    end = std::chrono::high_resolution_clock::now();
    duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
    LOG << "获取raftApply消息耗时：raftIndex : " << newLogIndex << " , ClientId : " << op.ClientId << " , RequestId : " << op.RequestId << " , Duration : " << duration << "us";


    waitApplyChMtx.lock();
    auto tmp = waitApplyCh[newLogIndex];
    waitApplyCh.erase(newLogIndex);
    delete tmp;
    waitApplyChMtx.unlock();

}

void KvServer::GetCommandFromRaft(ApplyMsg message) {
    Op op;
    op.parseFromString(message.Command);

    LOG << "[KvServer::GetCommandFromRaft-kvserver{" << m_me << "}] , Got Command --> Index:{" << message.CommandIndex << "} , ClientId {" << op.ClientId << "}, RequestId {" << op.RequestId << "}, Opreation {" << op.Operation << "}, Key :{" << op.Key << "}, Value :{" << op.Value << "}";
    if (message.CommandIndex <= m_lastSnapShotRaftLogIndex) {
        return;
    }

    // State Machine (KVServer solute the duplicate problem)
    // duplicate command will not be executed
    if (!ifRequestDuplicate(op.ClientId, op.RequestId)) {
        // execute command
        if (op.Operation == "Put") {
            ExecutePutOpOnKVDB(op);
        }
        if (op.Operation == "Append") {
            ExecuteAppendOpOnKVDB(op);
        }
        //  kv.lastRequestId[op.ClientId] = op.RequestId  在Executexxx函数里面更新的
    }
    //到这里kvDB已经制作了快照
    if (m_maxRaftState != -1) {
        IfNeedToSendSnapShotCommand(message.CommandIndex, 9);
        //如果raft的log太大（大于指定的比例）就把制作快照
    }

    // Send message to the chan of op.ClientId
    SendMessageToWaitChan(op, message.CommandIndex);
}

bool KvServer::ifRequestDuplicate(std::string ClientId, int RequestId) {
    std::lock_guard<std::mutex> lg(m_lastRequestIdMtx);
    if (m_lastRequestId.find(ClientId) == m_lastRequestId.end()) {
        return false;
        // todo :不存在这个client就创建
    }
    return RequestId <= m_lastRequestId[ClientId];
}

// get和put/append执行的具体细节是不一样的
// PutAppend在收到raft消息之后执行，具体函数里面只判断幂等性（是否重复）
// get函数收到raft消息之后在，因为get无论是否重复都可以再执行
void KvServer::PutAppend(const raftKVRpcProctoc::PutAppendArgs* args, raftKVRpcProctoc::PutAppendReply* reply) {
    Op op;
    op.Operation = args->op();
    op.Key = args->key();
    op.Value = args->value();
    op.ClientId = args->clientid();
    op.RequestId = args->requestid();
    int raftIndex = -1;
    int _ = -1;
    bool isleader = false;
    // 测试达成共识耗时（微秒）
    auto start = std::chrono::high_resolution_clock::now();

    m_raftNode->Start(op, &raftIndex, &_, &isleader);
    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
    LOG << "KvServer::PutAppend向Raft节点{" << m_me << "}发送新日志耗时：" << duration<<" us";

    if (!isleader) {
        LOG << "[func -KvServer::PutAppend -kvserver{" << m_me << "}]From Client " << args->clientid() << " (Request " << args->requestid() << ") To Server " << m_me << ", key " << op.Key << ", raftIndex " << raftIndex << " , but not leader";

        reply->set_err(ErrWrongLeader);
        return;
    }


    waitApplyChMtx.lock();
    if (waitApplyCh.find(raftIndex) == waitApplyCh.end()) {
        waitApplyCh.insert(std::make_pair(raftIndex, new LockQueue<Op>()));
    }
    auto chForRaftIndex = waitApplyCh[raftIndex];
    waitApplyChMtx.unlock();  //解锁，防止死锁

    // 测试耗时
    start = std::chrono::high_resolution_clock::now();
    // timeout
    Op raftCommitOp;

    if (!chForRaftIndex->timeOutPop(CONSENSUS_TIMEOUT, &raftCommitOp)) {
        // DPrintf(
        //     "[func -KvServer::PutAppend -kvserver{%d}]TIMEOUT PUTAPPEND !!!! Server %d , get Command <-- Index:%d , "
        //     "ClientId %s, RequestId %s, Opreation %s Key :%s, Value :%s",
        //     m_me, m_me, raftIndex, &op.ClientId, op.RequestId, &op.Operation, &op.Key, &op.Value);

        LOG << "[func -KvServer::PutAppend -kvserver{" << m_me << "}]TIMEOUT PUTAPPEND !!!! Server " << m_me << " , get Command <-- Index:" << raftIndex << " , ClientId " << op.ClientId << ", RequestId " << op.RequestId << ", Opreation " << op.Operation << ", Key :" << op.Key << ", Value :" << op.Value;
        if (ifRequestDuplicate(op.ClientId, op.RequestId)) {
            reply->set_err(OK);  // 超时了,但因为是重复的请求，返回ok，实际上就算没有超时，在真正执行的时候也要判断是否重复
        }
        else {
            reply->set_err(ErrWrongLeader);  ///这里返回这个的目的让clerk重新尝试
        }
    }
    else {

        LOG << "[func -KvServer::PutAppend -kvserver{" << m_me << "}]WaitChanGetRaftApplyMessage<--Server " << m_me << " , get Command <-- Index:" << raftIndex << " , ClientId " << op.ClientId << ", RequestId " << op.RequestId << ", Opreation " << op.Operation << ", Key :" << op.Key << ", Value :" << op.Value;
        if (raftCommitOp.ClientId == op.ClientId && raftCommitOp.RequestId == op.RequestId) {
            //可能发生leader的变更导致日志被覆盖，因此必须检查
            reply->set_err(OK);
        }
        else {
            reply->set_err(ErrWrongLeader);
        }
    }
    end = std::chrono::high_resolution_clock::now();
    duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
    LOG << "获取raftApply消息耗时：raftIndex : " << raftIndex << " , ClientId : " << op.ClientId << " , RequestId : " << op.RequestId << " , Duration : " << duration << "us";


    waitApplyChMtx.lock();
    auto tmp = waitApplyCh[raftIndex];
    waitApplyCh.erase(raftIndex);
    delete tmp;
    waitApplyChMtx.unlock();

}

void KvServer::ReadRaftApplyCommandLoop() {
    while (true) {
        //如果只操作applyChan不用拿锁，因为applyChan自己带锁
        auto message = applyChan->Pop();  //阻塞弹出
        // 这里的message是raft的apply消息

        // 测试耗时
        auto start = std::chrono::high_resolution_clock::now();
        if (message.CommandValid) {
            GetCommandFromRaft(message);
        }
        auto end = std::chrono::high_resolution_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
        LOG << "KvServer::GetCommandFromRaft消息耗时：raftIndex : " << message.CommandIndex << " , Duration : " << duration << "us";
        // 这里的message.CommandIndex是raft的logIndex
        // 这里的message.Command是raft的log内容

        // 测试耗时
        start = std::chrono::high_resolution_clock::now();
        if (message.SnapshotValid) {
            GetSnapShotFromRaft(message);
        }
        end = std::chrono::high_resolution_clock::now();
        duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
        LOG << "KvServer::GetSnapShotFromRaft消息耗时：raftIndex : " << message.SnapshotIndex << " , Duration : " << duration << "us";

    }
}

// raft会与persist层交互，kvserver层也会，因为kvserver层开始的时候需要恢复kvdb的状态
//  关于快照raft层与persist的交互：保存kvserver传来的snapshot；生成leaderInstallSnapshot RPC的时候也需要读取snapshot；
//  因此snapshot的具体格式是由kvserver层来定的，raft只负责传递这个东西
//  snapShot里面包含kvserver需要维护的persist_lastRequestId 以及kvDB真正保存的数据persist_kvdb
void KvServer::ReadSnapShotToInstall(std::string snapshot) {
    if (snapshot.empty()) {
        // bootstrap without any state?
        return;
    }
    parseFromString(snapshot);

}

bool KvServer::SendMessageToWaitChan(const Op& op, int raftIndex) {
    std::lock_guard<std::mutex> lg(waitApplyChMtx);

    LOG << "[RaftApplyMessageSendToWaitChan--> raftserver{" << m_me << "}] , Send Command --> Index:{" << raftIndex << "} , ClientId {" << op.ClientId << "}, RequestId {" << op.RequestId << "}, Opreation {" << op.Operation << "}, Key :{" << op.Key << "}, Value :{" << op.Value << "}";

    if (waitApplyCh.find(raftIndex) == waitApplyCh.end()) {
        return false;
    }
    waitApplyCh[raftIndex]->Push(op);

    return true;
}

void KvServer::IfNeedToSendSnapShotCommand(int raftIndex, int proportion) {
    return;
    if (m_raftNode->GetRaftStateSize() > m_maxRaftState / 10.0) {
        // Send SnapShot Command
        auto snapshot = MakeSnapShot();
        m_raftNode->Snapshot(raftIndex, snapshot);
    }
}

void KvServer::GetSnapShotFromRaft(ApplyMsg message) {
    std::lock_guard<std::mutex> lg(m_mtx);

    if (m_raftNode->CondInstallSnapshot(message.SnapshotTerm, message.SnapshotIndex, message.Snapshot)) {
        ReadSnapShotToInstall(message.Snapshot);
        m_lastSnapShotRaftLogIndex = message.SnapshotIndex;
    }
}

std::string KvServer::MakeSnapShot() {
    std::lock_guard<std::mutex> lg(m_mtx);
    std::string snapshotData = getSnapshotData();
    return snapshotData;
}

void KvServer::PutAppend(google::protobuf::RpcController* controller, const ::raftKVRpcProctoc::PutAppendArgs* request,
    ::raftKVRpcProctoc::PutAppendReply* response, ::google::protobuf::Closure* done) {
    // 测试KvServer::PutAppend函数耗时（微秒）
    auto start = std::chrono::high_resolution_clock::now();
    KvServer::PutAppend(request, response);
    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
    LOG << "KvServer::PutAppend（clerk的rpc调用）耗时：" << duration << " us";
    done->Run();
}

void KvServer::Get(google::protobuf::RpcController* controller, const ::raftKVRpcProctoc::GetArgs* request,
    ::raftKVRpcProctoc::GetReply* response, ::google::protobuf::Closure* done) {
    // 测试KVServer::Get函数耗时（微秒）
    auto start = std::chrono::high_resolution_clock::now();
    KvServer::Get(request, response);
    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
    LOG << "KvServer::Get（clerk的rpc调用）耗时：" << duration << " us";
    done->Run();
}

KvServer::KvServer(int me, int maxraftstate, std::string nodeInforFileName, short port) 
// :m_skipList(6)
{

    // m_skipList = SkipList<std::string, std::string>(6);
    m_kvDB= DatabaseFactory<std::string, std::string>::create(DatabaseType::HashMap);

    std::shared_ptr<Persister> persister = std::make_shared<Persister>(me);

    m_me = me;
    m_maxRaftState = maxraftstate;

    applyChan = std::make_shared<LockQueue<ApplyMsg> >();

    m_raftNode = std::make_shared<Raft>();
    ////////////////clerk层面 kvserver开启rpc接受功能
    //    同时raft与raft节点之间也要开启rpc功能，因此有两个注册
    std::thread t([this, port]() -> void {
        // provider是一个rpc网络服务对象。把UserService对象发布到rpc节点上
        RpcProvider provider;
        provider.NotifyService(this);
        provider.NotifyService(this->m_raftNode.get());  // todo：这里获取了原始指针，后面检查一下有没有泄露的问题 或者 shareptr释放的问题
        // 启动一个rpc服务发布节点   Run以后，进程进入阻塞状态，等待远程的rpc调用请求
        provider.Run(m_me, port);
        });
    t.detach();

    ////开启rpc远程调用能力，需要注意必须要保证所有节点都开启rpc接受功能之后才能开启rpc远程调用能力
    ////这里使用睡眠来保证
    std::cout << "raftServer node:" << m_me << " start to sleep to wait all ohter raftnode start!!!!" << std::endl;
    sleep(6);
    std::cout << "raftServer node:" << m_me << " wake up!!!! start to connect other raftnode" << std::endl;
    //获取所有raft节点ip、port ，并进行连接  ,要排除自己
    MprpcConfig config;
    config.LoadConfigFile(nodeInforFileName.c_str());
    std::vector<std::pair<std::string, short> > ipPortVt;
    for (int i = 0; i < INT_MAX - 1; ++i) {
        std::string node = "node" + std::to_string(i);

        std::string nodeIp = config.Load(node + "ip");
        std::string nodePortStr = config.Load(node + "port");
        if (nodeIp.empty()) {
            break;
        }
        ipPortVt.emplace_back(nodeIp, atoi(nodePortStr.c_str()));  //沒有atos方法，可以考慮自己实现
    }
    std::vector<std::shared_ptr<RaftRpcUtil> > servers;
    //进行连接
    for (int i = 0; i < ipPortVt.size(); ++i) {
        if (i == m_me) {
            servers.push_back(nullptr);
            continue;
        }
        std::string otherNodeIp = ipPortVt[i].first;
        short otherNodePort = ipPortVt[i].second;
        auto* rpc = new RaftRpcUtil(otherNodeIp, otherNodePort);
        servers.push_back(std::shared_ptr<RaftRpcUtil>(rpc));

        std::cout << "node" << m_me << " 连接node" << i << "success!" << std::endl;
    }
    sleep(ipPortVt.size() - me);  //等待所有节点相互连接成功，再启动raft
    m_raftNode->init(servers, m_me, persister, applyChan);
    // kv的server直接与raft通信，但kv不直接与raft通信，所以需要把ApplyMsg的chan传递下去用于通信，两者的persist也是共用的

    //////////////////////////////////

    // You may need initialization code here.
    // m_kvDB; //kvdb初始化
    // m_skipList;
    waitApplyCh;
    m_lastRequestId;
    m_lastSnapShotRaftLogIndex = 0;  // todo:感覺這個函數沒什麼用，不如直接調用raft節點中的snapshot值？？？
    auto snapshot = persister->ReadSnapshot();
    if (!snapshot.empty()) {
        ReadSnapShotToInstall(snapshot);
    }
    std::thread t2(&KvServer::ReadRaftApplyCommandLoop, this);  //马上向其他节点宣告自己就是leader

    // 由于ReadRaftApplyCommandLoop是一个死循环，因此这里不需要join，但是为了防止主线程结束，导致子线程也结束，所以这里加上join
    t2.join();
}
